import time
import datetime
import pandas as pd
import matplotlib.pyplot as plt
import plotly.express as px
import seaborn as sb
import numpy as np
import random
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.holtwinters import SimpleExpSmoothing, ExponentialSmoothing
from sklearn.neighbors import KNeighborsRegressor
from sklearn.model_selection import train_test_split,GridSearchCV
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.linear_model import Lasso
from sklearn.metrics import mean_squared_error
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
import pickle
import yfinance as yf
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import r2_score
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint
from tensorflow import random
ticker = '^GSPC'
period1 = int(time.mktime(datetime.datetime(1927, 12, 29, 23, 59).timetuple()))
period2 = int(time.mktime(datetime.datetime(2023, 1, 16, 23, 59).timetuple()))
interval = '1d'
query_string = f'https://query1.finance.yahoo.com/v7/finance/download/{ticker}?period1={period1}&period2={period2}&interval={interval}&events=history&includeAdjustedClose=true'
df = pd.read_csv(query_string)
csv_data=df.to_csv('SNP.csv', header=True, index = False)
snpdf = pd.read_csv("SNP.csv")
snpdf.head()
snpdf.shape
snpdf.describe()
snpdf.info()
snpdf.isnull().sum()
There are no null values in the data set.
plt.figure(figsize=(15,6))
plt.plot(snpdf['Close'],'red')
plt.plot(snpdf['Adj Close'],'yellow')
plt.xlabel('Close')
plt.ylabel('Adj Close')
plt.show()
snpdf['Close'].equals(snpdf['Adj Close'])
From both of the tests above, we can see that values of Close and Adj Close are same and hence we can drop one of the columns.
del snpdf['Adj Close']
snpdf.head()
snpdf['Date']=pd.to_datetime(snpdf['Date'])
fig = px.line(snpdf, x=snpdf['Date'].dt.year, y=snpdf['Close'])
fig.update_layout(title='Closing Price of S&P500 over years',
xaxis_title='Year',
yaxis_title='Closing Price')
fig.show()
Reason behind dip or growth in S&P 500 throughout years
features = ['Open', 'High', 'Low', 'Close', 'Volume']
for i, col in enumerate(features):
duration_box = px.box(
snpdf,
x=snpdf[col],
height=200
)
duration_box.show()
From above boxplots, we can see that only Volume column has outliers values. The rest of the columns doesn't have significant number of outliers.
ARIMA
arpdf = snpdf.copy()
arpdf['Date'] = pd.to_datetime(arpdf['Date'])
arpdf.set_index('Date', inplace=True)
arpdf = arpdf.asfreq('B')
train = arpdf.iloc[:-365]
test = arpdf.iloc[-365:]
arimamodel = ARIMA(train['Close'], order=(1,1,1))
model_fit = arimamodel.fit()
predictions = model_fit.predict(start=len(train), end=len(train)+len(test)-1, typ='levels')
plt.plot(test['Close'], label='Actual')
plt.plot(predictions, label='Predicted')
plt.legend()
plt.show()
Linear Regression
lrdf=snpdf
# 1 for increasing trend, 0 for same or decreasing trend, as compared to previous day
lrdf["Trend"] = lrdf["Close"].diff().apply(lambda x:1 if x>0 else 0)
lrdf.dropna(inplace=True)
feature = ["Open", "Close", "High", "Low"]
X = lrdf[feature]
y = lrdf["Trend"]
X_train_lr, X_test_lr, y_train_lr, y_test_lr = train_test_split(X, y, test_size=0.2, shuffle=False)
lrmodel = LinearRegression()
lrmodel.fit(X_train_lr, y_train_lr)
y_pred_lr = lrmodel.predict(X_test_lr)
y_pred_lr = np.exp(y_pred_lr) / (1 + np.exp(y_pred_lr)) #Transforming predicted value between 0 and 1
MSE = mean_squared_error(y_test_lr, y_pred_lr)
print("Mean Square Error:", MSE)
dates = X_test_lr.index
pred_lrdf = pd.DataFrame({"Date": dates, "Trend": y_pred_lr})
# Plot the predicted trend values
plt.plot(pred_lrdf["Date"], pred_lrdf["Trend"])
plt.title("Predicted Trend of S&P 500 Index")
plt.xlabel("Index")
plt.ylabel("Trend")
plt.show()
plt.figure(figsize=(10,5))
plt.plot(y_test_lr.values, label="Actual value")
plt.plot(y_pred_lr, label="Predicted value")
plt.legend()
plt.show()
kNN
X_train, X_test, y_train, y_test = train_test_split(snpdf[['Open', 'Low', 'High', 'Volume']], snpdf['Close'], test_size=0.2, random_state=10, shuffle=False)
# Initialize the regressor and set the number of neighbors (k)
k = 5
knn = KNeighborsRegressor(n_neighbors=k)
knn.fit(X_train, y_train)
y_pred = knn.predict(X_test)
mse_knn1 = mean_squared_error(y_test, y_pred)
print("Mean Squared Error: ", mse_knn1)
X_train, X_test, y_train, y_test = train_test_split(snpdf[['Open', 'Low', 'High', 'Volume']], snpdf['Close'], test_size=0.2, random_state=10, shuffle=False)
scaler = StandardScaler()
scaler.fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)
k = 5
knn = KNeighborsRegressor(n_neighbors=k)
knn.fit(X_train_scaled, y_train)
y_pred = knn.predict(X_test_scaled)
mse_knn2 = mean_squared_error(y_test, y_pred)
print("Mean Squared Error: ", mse_knn2)
X_train, X_test, y_train, y_test = train_test_split(snpdf[['Open', 'Low', 'High', 'Volume']], snpdf['Close'], test_size=0.2, random_state=10, shuffle=False)
scaler = StandardScaler()
scaler.fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)
param_grid = {
"n_neighbors": [3, 5, 7, 9],
"weights": ["uniform", "distance"],
"p": [1, 2]
}
knn = KNeighborsRegressor()
grid_search = GridSearchCV(knn, param_grid, cv=5)
grid_search.fit(X_train_scaled, y_train)
print("Best hyperparameters for kNN:", grid_search.best_params_)
# Use the best hyperparameters to initialize a new KNN regressor and fit it on the scaled training data
best_knn = KNeighborsRegressor(**grid_search.best_params_)
best_knn.fit(X_train_scaled, y_train)
y_pred = best_knn.predict(X_test_scaled)
mse_knn3 = mean_squared_error(y_test, y_pred)
print("Mean Squared Error: ", mse_knn3)
plt.scatter(y_test, y_pred)
plt.title('kNN Performance')
plt.show()
Lasso Regression
lsdf = snpdf
X_train, X_test, y_train, y_test = train_test_split(lsdf[['Open', 'Close', 'Low', 'High']], lsdf['Volume'], test_size=0.3, random_state=42, shuffle=False)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
lasso = Lasso(alpha=0.5, max_iter=100000) # set the regularization parameter alpha to 0.1
lasso.fit(X_train, y_train)
y_pred = lasso.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print('Mean Squared Error: ', mse)
X_train, X_test, y_train, y_test = train_test_split(lsdf[['Open', 'Low', 'High', 'Volume']], lsdf['Close'], test_size=0.3, random_state=42, shuffle=False)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
lasso2 = Lasso(alpha=0.1, max_iter=10000) # set the regularization parameter alpha to 0.1
lasso2.fit(X_train, y_train)
y_pred = lasso2.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print('Mean Squared Error: ', mse)
X = snpdf[['Low', 'High', 'Open', 'Volume']].values
y = snpdf['Close'].values
lasso_pipe = Pipeline([
('scaler', StandardScaler()),
('lasso', Lasso())
])
lasso_pipe.named_steps['lasso'].shuffle = False
param_grid = {'lasso__alpha': np.logspace(-5, 1, 100)}
kf = KFold(n_splits=5, shuffle=False)
grid_search = GridSearchCV(lasso_pipe, param_grid, cv=kf, scoring='neg_mean_squared_error')
grid_search.fit(X, y)
print("Best alpha:", grid_search.best_params_['lasso__alpha'])
print("Mean squared error:", -grid_search.best_score_)
y_pred = grid_search.predict(X)
residuals = y - y_pred
plt.scatter(y_pred, residuals)
plt.xlabel('Predicted values')
plt.ylabel('Residuals')
plt.title('Residual plot')
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0, shuffle = False)
y_pred = np.roll(y_test, 1)
mse_baseline = mean_squared_error(y_test, y_pred)
grid_search.fit(X_train, y_train)
y_pred = grid_search.predict(X_test)
mse_lasso = mean_squared_error(y_test, y_pred)
plt.bar(['Baseline', 'Lasso'], [mse_baseline, -grid_search.best_score_])
plt.ylabel('Mean squared error')
plt.title('Model performance')
import joblib
joblib.dump(grid_search.best_estimator_, 'lasso_model.pkl')
lasso_model = joblib.load('lasso_model.pkl')
sp500 = yf.download('^GSPC', start='2023-01-23', end='2023-03-31')
del sp500['Adj Close']
X_new = sp500.drop(['Close'], axis=1)
y_pred = lasso_model.predict(X_new)
y_true = sp500['Close'].values
mae = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
print(f"MAE: {mae:.2f}")
print(f"RMSE: {rmse:.2f}")
plt.plot(y_true, label='True')
plt.plot(y_pred, label='Predicted')
plt.xlabel('Day')
plt.ylabel('S&P500 Closing Price')
plt.legend()
plt.show()
Random forest
X=snpdf.drop(['Close', 'Date'], axis=1)
y=snpdf['Close']
#Splitting into training(80%) and testing data
X_train_rf, X_test_rf, y_train_rf, y_test_rf = train_test_split(X, y, test_size=0.2, random_state=np.random.seed(40), shuffle=False)
#Creating random forest model
rfmodel = RandomForestRegressor(n_estimators=100, random_state=np.random.seed(40))
#Training the model
rfmodel.fit(X_train_rf, y_train_rf)
predictions_rf = rfmodel.predict(X_test_rf)
MSE_rf = mean_squared_error(y_test_rf, predictions_rf)
print('Mean squared error:', MSE_rf)
r2_rf = r2_score(y_test_rf, predictions_rf)
print('R squared:', r2_rf)
plt.figure(figsize=(10,5))
plt.plot(y_test_rf,color='pink', label='Actual value', alpha=0.5)
plt.plot(predictions_rf,color='blue', label='Predicted value', alpha=0.2)
plt.xlabel('Index')
plt.ylabel('Closing Price')
plt.title('Actual vs. Predicted Values of S&P500')
plt.legend()
plt.show()
plt.scatter(y_test_rf,predictions_rf, c=['yellow' if x < y else 'orange' for x, y in zip(y_test_rf, predictions_rf)], alpha=0.5 )
#yellow-predicted
plt.xlabel('Actual Values')
plt.ylabel('Predicted Values')
plt.title('Actual vs. Predicted Values of S&P500')
plt.show()
param_dist = {'n_estimators': randint(50, 500),
'max_features': ['auto', 'sqrt', 'log2'],
'max_depth': [10, 20, 30, 40, None],
'min_samples_split': randint(2, 20),
'min_samples_leaf': randint(1, 10)}
random_search = RandomizedSearchCV(rfmodel, param_distributions=param_dist, n_iter=50,
n_jobs=-1, cv=5, random_state=np.random.seed(40))
random_search.fit(X_train_rf, y_train_rf)
print('Best hyperparameters:', random_search.best_params_)
#Random forest model using best parameters
#New shuffle
rfmodel_best = RandomForestRegressor(n_estimators=230, max_features='sqrt', max_depth=30,
min_samples_split=10, min_samples_leaf=2, random_state=np.random.seed(40))
rfmodel_best.fit(X_train_rf, y_train_rf)
predictions_rfbest = rfmodel_best.predict(X_test_rf)
MSE_rfbest = mean_squared_error(y_test_rf, predictions_rfbest)
print('Mean squared error:', MSE_rfbest)
r2_rfbest = r2_score(y_test_rf, predictions_rfbest)
print('R squared:', r2_rfbest)
plt.figure(figsize=(10,5))
plt.plot(y_test_rf,color='pink', label='Actual value', alpha=0.5)
plt.plot(predictions_rfbest,color='blue', label='Predicted value', alpha=0.2)
plt.xlabel('Index')
plt.ylabel('Closing Price')
plt.title('Actual vs. Predicted Values of S&P500')
plt.legend()
plt.show()
plt.scatter(y_test_rf,predictions_rfbest, c=['yellow' if x < y else 'orange' for x, y in zip(y_test_rf, predictions_rfbest)], alpha=0.5 )
#yellow-predicted
plt.xlabel('Actual Values')
plt.ylabel('Predicted Values')
plt.title('Actual vs. Predicted Values of S&P500')
plt.show()
LSTM-Long Short Term Memory Neural Network Model
lstmdf = snpdf
lstmdf['Date'] = pd.to_datetime(lstmdf['Date'])
lstmdf['Year'] = lstmdf['Date'].dt.year
lstmdf['Month'] = lstmdf['Date'].dt.month
lstmdf['Day'] = lstmdf['Date'].dt.day
lstmdf = lstmdf.drop(columns=['Date'])
lstmdf.head()
#Scaling data
scaler = MinMaxScaler()
scaling_data = scaler.fit_transform(lstmdf)
#Splitting into training(80%) and testing data
train_size = int(len(scaling_data) * 0.8)
train_data = scaling_data[:train_size, :]
test_data = scaling_data[train_size:, :]
#Creating sequence
def create_sequence(data, length):
X = []
y = []
for i in range(len(data)-length):
X.append(data[i:i+length])
y.append(data[i+length])
return np.array(X), np.array(y)
length = 10
X_train, y_train = create_sequence(train_data, length)
X_test, y_test = create_sequence(test_data,length)
#LSTM model
random.set_seed(452)
lstmmodel = Sequential()
lstmmodel.add(LSTM(50,input_shape=(length, lstmdf.shape[1])))
lstmmodel.add(Dense(9)) #Dense(8)
lstmmodel.compile(loss='mean_squared_error', optimizer='adam')
#Training the model
history = lstmmodel.fit(X_train, y_train, epochs=15, batch_size=2, validation_split=0.1, verbose=1)
score = lstmmodel.evaluate(X_test, y_test)
print("Error in testing:",score)
print("Accuracy of LSTM model",(1-score)*100)
predictions = lstmmodel.predict(X_test)
# predictions = scaler.inverse_transform(predictions)
# y_test = scaler.inverse_transform(y_test)
MSE = mean_squared_error(y_test, predictions)
print("Mean Square Error:", MSE)
plt.figure(figsize=(10,5))
plt.plot(y_test,color='yellow', label='Actual value', alpha=0.5)
plt.plot(predictions,color='black', label='Predicted value', alpha=0.5)
plt.xlabel('Year')
plt.ylabel('S&P500 Index')
plt.legend()
plt.show()